Skip to content

feat: support file-level parquet row selections#22940

Open
haohuaijin wants to merge 8 commits into
apache:mainfrom
haohuaijin:row-selection-access-plan
Open

feat: support file-level parquet row selections#22940
haohuaijin wants to merge 8 commits into
apache:mainfrom
haohuaijin:row-selection-access-plan

Conversation

@haohuaijin

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Rationale for this change

What changes are included in this PR?

  • Add public ParquetRowSelection.
  • Add ParquetAccessPlan::try_new_from_overall_row_selection.
  • Allow Parquet opener setup to read either ParquetAccessPlan or ParquetRowSelection.
  • Reject using both extension types on the same file.
  • Validate that the selection row count matches the file row count.
  • Document the new extension path in ParquetSource.

Are these changes tested?

Yes. This PR adds tests for:

  • converting a file-level selection into row-group access
  • rejecting invalid selection row counts
  • creating an initial plan from ParquetRowSelection
  • rejecting both ParquetAccessPlan and ParquetRowSelection on the same file

Are there any user-facing changes?

Yes. This adds a new public ParquetRowSelection type for callers that want to attach a file-level Parquet RowSelection to a PartitionedFile.

@github-actions github-actions Bot added the datasource Changes to the datasource crate label Jun 13, 2026

@kosiew kosiew left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@haohuaijin
Thanks for the PR. I do not see any blocking issues, just a few suggestions that could make the implementation a bit simpler and help protect the new extension behavior.

Comment thread datafusion/datasource-parquet/src/access_plan.rs Outdated
Comment thread datafusion/datasource-parquet/src/opener/mod.rs
Comment thread datafusion/datasource-parquet/src/opener/mod.rs
@github-actions github-actions Bot added the core Core DataFusion crate label Jun 18, 2026

@kosiew kosiew left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the follow-up.

Comment thread datafusion/core/tests/parquet/external_access_plan.rs
selection: RowSelection,
row_group_meta_data: &[RowGroupMetaData],
) -> Result<Self> {
let selectors: Vec<RowSelector> = selection.into();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The performance concern around RowSelection::split_off makes sense. That said, this manual current / leading / mixed cursor logic is a bit harder to audit, especially since the full selector vector is still materialized up front.

If we keep this version, it would be helpful to add a short comment or benchmark note explaining the measured reason for the manual approach. Otherwise, the earlier split_off shape feels easier to reason about because it directly captures the boundary-splitting invariant.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kosiew thank you for the suggestion, i restructed the code, can you check again.

i do a benchmark for split_off and this pr, i add this bench.rs to datafusion-examples/examples/bench.rs

cargo build --release -p datafusion-examples --example bench
/usr/bin/time -l target/release/examples/bench split_off heavy 100
/usr/bin/time -l target/release/examples/bench new heavy 100

Scenario: heavy's result, run 100 times

Implementation Total elapsed Avg/iteration Max RSS Peak footprint
split_off ~2055 ms ~20.55 ms ~185 MB ~181 MB
new ~592 ms ~5.92 ms ~137 MB ~133 MB
benchmark code
use datafusion::datasource::physical_plan::parquet::{ParquetAccessPlan, RowGroupAccess};
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
use std::time::Instant;

const ROWS_PER_RG: usize = 130_000;
const NUM_ROW_GROUPS: usize = 39;
const TOTAL_ROWS: usize = ROWS_PER_RG * NUM_ROW_GROUPS;
type BenchFn = fn(RowSelection, &[RowGroupMetaData]) -> ParquetAccessPlan;

struct OverallRowSelectionCursor {
    selector_iter: std::vec::IntoIter<RowSelector>,
    current: Option<RowSelector>,
}

impl OverallRowSelectionCursor {
    fn new(selection: RowSelection) -> Self {
        let selectors: Vec<RowSelector> = selection.into();
        let mut selector_iter = selectors.into_iter();
        let current = selector_iter.next();
        Self {
            selector_iter,
            current,
        }
    }

    #[inline]
    fn take(&mut self, max_rows: usize) -> Option<RowSelector> {
        let sel = self.current?;
        let row_count = sel.row_count.min(max_rows);
        self.current = if row_count < sel.row_count {
            Some(RowSelector {
                row_count: sel.row_count - row_count,
                skip: sel.skip,
            })
        } else {
            self.selector_iter.next()
        };

        Some(RowSelector {
            row_count,
            skip: sel.skip,
        })
    }

    fn remaining_rows(self) -> usize {
        self.current.map_or(0, |s| s.row_count)
            + self.selector_iter.map(|s| s.row_count).sum::<usize>()
    }
}

struct RowGroupAccessBuilder {
    selectors: Vec<RowSelector>,
    selected: usize,
    skipped: usize,
    remaining: usize,
}

impl RowGroupAccessBuilder {
    fn new(row_group_rows: usize) -> Self {
        Self {
            selectors: Vec::with_capacity(1),
            selected: 0,
            skipped: 0,
            remaining: row_group_rows,
        }
    }

    #[inline]
    fn push(&mut self, selector: RowSelector) {
        self.remaining -= selector.row_count;

        if selector.skip {
            self.skipped += selector.row_count;
        } else {
            self.selected += selector.row_count;
        }

        self.selectors.push(selector);
    }

    fn into_access(self) -> RowGroupAccess {
        if self.selected == 0 {
            RowGroupAccess::Skip
        } else if self.skipped == 0 {
            RowGroupAccess::Scan
        } else {
            RowGroupAccess::Selection(self.selectors.into())
        }
    }
}

fn schema_descr() -> SchemaDescPtr {
    use parquet::basic::Type as PhysicalType;
    use parquet::schema::types::Type as SchemaType;
    let field = SchemaType::primitive_type_builder("a", PhysicalType::INT32)
        .build()
        .unwrap();
    let schema = SchemaType::group_type_builder("schema")
        .with_fields(vec![std::sync::Arc::new(field)])
        .build()
        .unwrap();
    std::sync::Arc::new(SchemaDescriptor::new(std::sync::Arc::new(schema)))
}

fn row_group_metadata() -> Vec<RowGroupMetaData> {
    let descr = schema_descr();
    (0..NUM_ROW_GROUPS)
        .map(|_| {
            let column = ColumnChunkMetaData::builder(descr.column(0))
                .set_num_values(ROWS_PER_RG as i64)
                .build()
                .unwrap();
            RowGroupMetaData::builder(descr.clone())
                .set_num_rows(ROWS_PER_RG as i64)
                .set_column_metadata(vec![column])
                .build()
                .unwrap()
        })
        .collect()
}

fn scattered_selection(num_pairs: usize) -> RowSelection {
    let mut selectors: Vec<RowSelector> = Vec::with_capacity(num_pairs * 2);
    let chunk = TOTAL_ROWS / num_pairs;
    let mut acc = 0usize;
    for i in 0..num_pairs {
        let this = if i == num_pairs - 1 {
            TOTAL_ROWS - acc
        } else {
            chunk
        };
        let sel = this.min(1);
        let skp = this - sel;
        if skp > 0 {
            selectors.push(RowSelector::skip(skp));
        }
        if sel > 0 {
            selectors.push(RowSelector::select(sel));
        }
        acc += this;
    }
    RowSelection::from(selectors)
}

fn new_try_new(
    selection: RowSelection,
    row_group_meta_data: &[RowGroupMetaData],
) -> ParquetAccessPlan {
    let mut cursor = OverallRowSelectionCursor::new(selection);

    let mut selection_rows = 0usize;
    let mut file_rows = 0usize;

    let mut row_groups = Vec::with_capacity(row_group_meta_data.len());
    for rg_meta in row_group_meta_data {
        let rg_rows = rg_meta.num_rows() as usize;
        file_rows += rg_rows;

        let mut builder = RowGroupAccessBuilder::new(rg_rows);
        while builder.remaining > 0 {
            let Some(selector) = cursor.take(builder.remaining) else {
                break;
            };
            selection_rows += selector.row_count;
            builder.push(selector);
        }

        row_groups.push(builder.into_access());
    }

    selection_rows += cursor.remaining_rows();

    assert_eq!(selection_rows, file_rows, "NEW: row count mismatch");
    ParquetAccessPlan::new(row_groups)
}

fn split_off_try_new(
    selection: RowSelection,
    row_group_meta_data: &[RowGroupMetaData],
) -> ParquetAccessPlan {
    let mut remaining_selection = selection;

    let mut selection_rows = 0usize;
    let mut file_rows = 0usize;

    let mut row_groups = Vec::with_capacity(row_group_meta_data.len());
    for rg_meta in row_group_meta_data {
        let rg_rows = rg_meta.num_rows() as usize;
        file_rows += rg_rows;

        let group_selection = remaining_selection.split_off(rg_rows);
        let selected = group_selection.row_count();
        let skipped = group_selection.skipped_row_count();
        selection_rows += selected + skipped;

        let access = if selected == 0 {
            RowGroupAccess::Skip
        } else if skipped == 0 {
            RowGroupAccess::Scan
        } else {
            RowGroupAccess::Selection(group_selection)
        };
        row_groups.push(access);
    }

    selection_rows +=
        remaining_selection.row_count() + remaining_selection.skipped_row_count();

    assert_eq!(selection_rows, file_rows, "SPLIT_OFF: row count mismatch");
    ParquetAccessPlan::new(row_groups)
}

fn run_one(
    name: &str,
    bench_fn: BenchFn,
    label: &str,
    num_pairs: usize,
    iterations: usize,
) {
    let meta = row_group_metadata();
    let selection = scattered_selection(num_pairs);
    let s = selection.iter().count();
    let started = Instant::now();

    for _ in 0..iterations {
        let plan = bench_fn(selection.clone(), &meta);
        std::hint::black_box(plan);
    }

    println!(
        "{name} {label} selectors={s} iterations={iterations} elapsed={:?}",
        started.elapsed()
    );
}

fn implementation(value: &str) -> Option<(&'static str, BenchFn)> {
    match value {
        "new" => Some(("new", new_try_new)),
        "split_off" => Some(("split_off", split_off_try_new)),
        _ => None,
    }
}

fn scenario(value: &str) -> Option<(&'static str, usize)> {
    match value {
        "coarse" => Some(("coarse", 40)),
        "medium" => Some(("medium", 50_000)),
        "heavy" => Some(("heavy", 1_000_000)),
        _ => None,
    }
}

fn usage(program: &str) -> ! {
    eprintln!(
        "Usage: {program} <new|split_off> <coarse|medium|heavy> [iterations]\n\
         Example: {program} new heavy 1"
    );
    std::process::exit(2);
}

fn main() {
    let args = std::env::args().collect::<Vec<_>>();
    let program = args.first().map(String::as_str).unwrap_or("bench");

    if !(3..=4).contains(&args.len()) {
        usage(&program);
    }

    let Some((name, bench_fn)) = implementation(&args[1]) else {
        usage(&program);
    };
    let Some((label, num_pairs)) = scenario(&args[2]) else {
        usage(&program);
    };
    let iterations = match args.get(3) {
        Some(value) => value.parse().unwrap_or_else(|_| usage(&program)),
        None => 1,
    };
    if iterations == 0 {
        usage(&program);
    }

    run_one(name, bench_fn, label, num_pairs, iterations);
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate datasource Changes to the datasource crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support file-level Parquet RowSelection

2 participants